/*
 * Copyright (c) 2015 The Jupiter Project
 *
 * Licensed under the Apache License, version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at:
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package com.inyourcode.core.transport.session;

import com.inyourcode.core.GlobalConstants;
import com.inyourcode.core.util.Signal;
import com.inyourcode.core.util.StackTraceUtil;
import com.inyourcode.core.transport.api.JConfig;
import com.inyourcode.core.transport.api.JOption;
import com.inyourcode.core.transport.api.Status;
import com.inyourcode.core.transport.api.channel.JChannel;
import com.inyourcode.core.transport.api.exception.IoSignals;
import com.inyourcode.core.transport.api.processor.ProviderProcessor;
import com.inyourcode.core.transport.netty.NettyTcpAcceptor;
import com.inyourcode.core.transport.netty.TcpChannelProvider;
import com.inyourcode.core.transport.netty.channel.NettyChannel;
import com.inyourcode.core.transport.netty.handler.IdleStateChecker;
import com.inyourcode.core.transport.netty.handler.ProtocolDecoder;
import com.inyourcode.core.transport.netty.handler.ProtocolEncoder;
import com.inyourcode.core.transport.netty.handler.acceptor.AcceptorHandler;
import com.inyourcode.core.transport.netty.handler.acceptor.AcceptorIdleStateTrigger;
import com.inyourcode.core.transport.session.head.MessageHeadDecoder;
import com.inyourcode.core.transport.session.head.MessageHeadEncoder;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;
import io.netty.util.ReferenceCountUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.net.SocketAddress;
import java.util.concurrent.atomic.AtomicInteger;


/**
 * @author JackLei
 */
public class BasicTcpAcceptor extends NettyTcpAcceptor {

    // handlers
    private final AcceptorIdleStateTrigger idleStateTrigger = new AcceptorIdleStateTrigger();
    private final ProtocolEncoder encoder = new ProtocolEncoder();
    private SessionAcceptorHandler handler;

    public BasicTcpAcceptor(int port) {
        super(port);
    }

    public BasicTcpAcceptor(SocketAddress localAddress) {
        super(localAddress);
    }

    public BasicTcpAcceptor(int port, int nWorks) {
        super(port, nWorks);
    }

    public BasicTcpAcceptor(SocketAddress localAddress, int nWorks) {
        super(localAddress, nWorks);
    }

    public BasicTcpAcceptor(int port, boolean nativeEt) {
        super(port, nativeEt);
    }

    public BasicTcpAcceptor(SocketAddress localAddress, boolean nativeEt) {
        super(localAddress, nativeEt);
    }

    public BasicTcpAcceptor(int port, int nWorks, boolean nativeEt) {
        super(port, nWorks, nativeEt);
    }

    public BasicTcpAcceptor(SocketAddress localAddress, int nWorks, boolean nativeEt) {
        super(localAddress, nWorks, nativeEt);
    }

    @Override
    protected void init() {
        super.init();

        // parent options
        JConfig parent = configGroup().parent();
        parent.setOption(JOption.SO_BACKLOG, 32768);
        parent.setOption(JOption.SO_REUSEADDR, true);

        // child options
        JConfig child = configGroup().child();
        child.setOption(JOption.SO_REUSEADDR, true);

        handler = new SessionAcceptorHandler();
        handler.processor = new BasicProviderProcessor();
    }

    @Override
    public ChannelFuture bind(SocketAddress localAddress) {
        ServerBootstrap boot = bootstrap();

        if (isNativeEt()) {
            boot.channelFactory(TcpChannelProvider.NATIVE_ACCEPTOR);
        } else {
            boot.channelFactory(TcpChannelProvider.NIO_ACCEPTOR);
        }
        boot.childHandler(new ChannelInitializer<SocketChannel>() {

            @Override
            protected void initChannel(SocketChannel ch) throws Exception {
                ch.pipeline().addLast(
                        new IdleStateChecker(timer, GlobalConstants.READER_IDLE_TIME_SECONDS, 0, 0),
                        idleStateTrigger,
                        new ProtocolDecoder(),
                        new MessageHeadDecoder(),
                        encoder,
                        new MessageHeadEncoder(),
                        handler);
            }
        });

        setOptions();

        return boot.bind(localAddress);
    }

    @Override
    public void withProcessor(ProviderProcessor processor) {
        throw  new RuntimeException("This method is not supported");
    }

    @ChannelHandler.Sharable
    static class SessionAcceptorHandler extends ChannelInboundHandlerAdapter {

        private static final Logger logger = LoggerFactory.getLogger(AcceptorHandler.class);

        private static final AtomicInteger channelCounter = new AtomicInteger(0);
        private ProviderProcessor processor;

        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            if (msg instanceof RequestContext) {
                JChannel jChannel = NettyChannel.attachChannel(ctx.channel());
                try {
                    processor.handleRequest(jChannel, msg);
                } catch (Throwable t) {
                    processor.handleException(jChannel, msg, Status.SERVER_ERROR, t);
                }
            } else {
                logger.warn("Unexpected msg type received:{}.", msg.getClass());

                ReferenceCountUtil.release(msg);
            }
        }

        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            int count = channelCounter.incrementAndGet();

            logger.info("Connects with {} as the {}th channel.", ctx.channel(), count);
            processor.channelActive(ctx);
            super.channelActive(ctx);
        }

        @Override
        public void channelInactive(ChannelHandlerContext ctx) throws Exception {
            int count = channelCounter.getAndDecrement();

            logger.warn("Disconnects with {} as the {}th channel.", ctx.channel(), count);
            processor.channelInactive(ctx);
            super.channelInactive(ctx);
        }

        @Override
        public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
            Channel ch = ctx.channel();

            // 高水位线: ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK
            // 低水位线: ChannelOption.WRITE_BUFFER_LOW_WATER_MARK
            if (!ch.isWritable()) {
                // 当前channel的缓冲区(OutboundBuffer)大小超过了WRITE_BUFFER_HIGH_WATER_MARK
                logger.warn("{} is not writable, high water mask: {}, the number of flushed entries that are not written yet: {}.",
                        ch, ch.config().getWriteBufferHighWaterMark(), ch.unsafe().outboundBuffer().size());

                ch.config().setAutoRead(false);
            } else {
                // 曾经高于高水位线的OutboundBuffer现在已经低于WRITE_BUFFER_LOW_WATER_MARK了
                logger.warn("{} is writable(rehabilitate), low water mask: {}, the number of flushed entries that are not written yet: {}.",
                        ch, ch.config().getWriteBufferLowWaterMark(), ch.unsafe().outboundBuffer().size());

                ch.config().setAutoRead(true);
            }
        }

        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            JChannel jChannel = NettyChannel.attachChannel(ctx.channel());
            if (cause instanceof Signal) {
                IoSignals.handleSignal((Signal) cause, jChannel);
            } else {
                logger.error("An exception has been caught {}, on {}.", StackTraceUtil.stackTrace(cause), jChannel);
            }
        }

        public ProviderProcessor processor() {
            return processor;
        }

        public void processor(ProviderProcessor processor) {
            this.processor = processor;
        }
    }

}
